package com.chenxu.gmall.realtime.utils;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;

import java.util.Properties;

/**
 * Date: 2021/1/30
 * Desc: 操作Kafka的工具类
 */
public class MyKafkaUtil {
    private static String KAFKA_SERVER = "hadoop102:9092,hadoop103:9092,hadoop104:9092";
    private static String DEFAULT_TOPIC = "DEFAULT_DATA";


    //获取FlinkKafkaConsumer
    public static FlinkKafkaConsumer<String> getKafkaSource(String topic, String groupId) {
        //groupId代表消费者组
        //Kafka连接的一些属性配置
        Properties props = new Properties();
        //消费者组信息
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        //Bootstrap-server相关信息
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER);
        //new SimpleStringSchema()反序列化类型；
        //返回的就是kafka的consumer对象；
        return new FlinkKafkaConsumer<String>(topic, new SimpleStringSchema(), props);
    }

    //封装FlinkKafkaProducer
    public static FlinkKafkaProducer<String> getKafkaSink(String topic) {
        //返回的就是kafka的producer对象；
        return new FlinkKafkaProducer<String>(KAFKA_SERVER, topic, new SimpleStringSchema());
    }

    /*
    两个创建 FlinkKafkaProducer 方法对比:
    前者给定确定的 Topic
    而后者除了缺省情况下会采用 DEFAULT_TOPIC，一般情况下可以根据不同的业务数据在 KafkaSerializationSchema 中通过方法实现。
     */
    //这种生成的FlinkKafkaProducer可以支持多个topic的情形；
    public static <T> FlinkKafkaProducer<T> getKafkaSinkBySchema(KafkaSerializationSchema<T> kafkaSerializationSchema) {
        Properties props = new Properties();
        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_SERVER);
        //设置生产数据的超时时间
        props.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,15*60*1000+"");
        //可以往默认的topic发送，也可以指定topic；
        //KafkaSerializationSchema 序列化的类型；
        //props 生产者配置信息；
        //FlinkKafkaProducer.Semantic 语义，指定精准一致性；
        //这样写可以发往多个主题；
        return new FlinkKafkaProducer<T>(DEFAULT_TOPIC, kafkaSerializationSchema, props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
    }

    //拼接Kafka相关属性到DDL
    public static String getKafkaDDL(String topic,String groupId){
        String ddl="'connector' = 'kafka', " +
            " 'topic' = '"+topic+"',"   +
            " 'properties.bootstrap.servers' = '"+ KAFKA_SERVER +"', " +
            " 'properties.group.id' = '"+groupId+ "', " +
            "  'format' = 'json', " +
            "  'scan.startup.mode' = 'latest-offset'  ";
        return  ddl;
    }

}
